{{!

  Copyright (c) Facebook, Inc. and its affiliates.

  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.

}}
{{> AutoGenerated}}

package {{service:javaPackage}};

import static com.facebook.swift.service.SwiftConstants.STICKY_HASH_KEY;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.thrift.protocol.*;
import org.apache.thrift.ClientPushMetadata;
import org.apache.thrift.InteractionCreate;
import org.apache.thrift.InteractionTerminate;
import com.facebook.thrift.client.ResponseWrapper;
import com.facebook.thrift.client.RpcOptions;

{{#service:disableReactive?}}// Generation of this class is disabled using (java.swift.disable_reactive){{/service:disableReactive?}}{{^service:disableReactive?}}
public class {{service:javaCapitalName}}ReactiveClient {{#service:extends}} extends {{service:javaPackage}}.{{service:javaCapitalName}}ReactiveClient{{/service:extends}}
  implements {{service:javaCapitalName}}.Reactive {
  private static final AtomicLong _interactionCounter = new AtomicLong(0);

  private final org.apache.thrift.ProtocolId _protocolId;
  private final reactor.core.publisher.Mono<? extends com.facebook.thrift.client.RpcClient> _rpcClient;
  private final Map<String, String> _headers;
  private final Map<String, String> _persistentHeaders;
  private final Set<Long> _activeInteractions;

  {{#service:supportedFunctions}}
  {{#function:args}}
  private static final TField _{{function:javaName}}_{{field:javaTFieldName}} = new TField("{{field:name}}", TType.{{#field:type}}{{> TType}}{{/field:type}}, (short){{field:key}});
  {{/function:args}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_EXCEPTION_READERS = {{#function:exceptions?}}new HashMap<>();{{/function:exceptions?}}{{^function:exceptions?}}java.util.Collections.emptyMap();{{/function:exceptions?}}
  {{#function:exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_EXCEPTION_READER{{field:index}} =
  oprot -> {
            try {
              {{#field:type}}
              {{> ReadReturnType }}

              return _r;
              {{/field:type}}
            } catch (Throwable _e) {
              throw reactor.core.Exceptions.propagate(_e);
            }
          };

  {{/function:exceptions}}
  {{/service:supportedFunctions}}
  {{#service:streamingFunctions}}
  {{#function:args}}
  private static final TField _{{function:javaName}}_{{field:javaTFieldName}} = new TField("{{field:name}}", TType.{{#field:type}}{{> TType}}{{/field:type}}, (short){{field:key}});
  {{/function:args}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_EXCEPTION_READERS = {{#function:exceptions?}}new HashMap<>();{{/function:exceptions?}}{{^function:exceptions?}}java.util.Collections.emptyMap();{{/function:exceptions?}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_STREAM_EXCEPTION_READERS = {{#function:stream_exceptions?}}new HashMap<>();{{/function:stream_exceptions?}}{{^function:stream_exceptions?}}java.util.Collections.emptyMap();{{/function:stream_exceptions?}}
  {{#function:exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_EXCEPTION_READER{{field:index}} =
  oprot -> {
            try {
              {{#field:type}}
              {{> ReadReturnType }}

              return _r;
              {{/field:type}}
            } catch (Throwable _e) {
              throw reactor.core.Exceptions.propagate(_e);
            }
          };

  {{/function:exceptions}}
  {{#function:stream_exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_STREAM_EXCEPTION_READER{{field:index}} =
  oprot -> {
            try {
              {{#field:type}}
              {{> ReadReturnType }}

              return _r;
              {{/field:type}}
            } catch (Throwable _e) {
              throw reactor.core.Exceptions.propagate(_e);
            }
          };

  {{/function:stream_exceptions}}
  {{/service:streamingFunctions}}
  {{#service:sinkFunctions}}
  {{#function:args}}
  private static final TField _{{function:javaName}}_{{field:javaTFieldName}} = new TField("{{field:name}}", TType.{{#field:type}}{{> TType}}{{/field:type}}, (short){{field:key}});
  {{/function:args}}
  {{#function:return_type}}{{#type:sink_elem_type}}
  private static final TField _{{function:javaName}}_SINK_TFIELD = new TField("payload", TType.{{> TType}}, (short)0);
  {{/type:sink_elem_type}}{{/function:return_type}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_EXCEPTION_READERS = {{#function:exceptions?}}new HashMap<>();{{/function:exceptions?}}{{^function:exceptions?}}java.util.Collections.emptyMap();{{/function:exceptions?}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_STREAM_EXCEPTION_READERS = {{#function:sink_final_response_exceptions?}}new HashMap<>();{{/function:sink_final_response_exceptions?}}{{^function:sink_final_response_exceptions?}}java.util.Collections.emptyMap();{{/function:sink_final_response_exceptions?}}
  {{#function:exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_EXCEPTION_READER{{field:index}} =
  oprot -> {
            try {
              {{#field:type}}
              {{> ReadReturnType }}

              return _r;
              {{/field:type}}
            } catch (Throwable _e) {
              throw reactor.core.Exceptions.propagate(_e);
            }
          };

  {{/function:exceptions}}
  {{#function:sink_final_response_exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_STREAM_EXCEPTION_READER{{field:index}} =
  oprot -> {
            try {
              {{#field:type}}
              {{> ReadReturnType }}

              return _r;
              {{/field:type}}
            } catch (Throwable _e) {
              throw reactor.core.Exceptions.propagate(_e);
            }
          };

  {{/function:sink_final_response_exceptions}}
  {{/service:sinkFunctions}}
  {{#service:interactions}}
  {{#service:supportedFunctions}}
  {{#function:args}}
  private static final TField _{{function:javaName}}_{{field:javaTFieldName}}_INT = new TField("{{field:name}}", TType.{{#field:type}}{{> TType}}{{/field:type}}, (short){{field:key}});
  {{/function:args}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_EXCEPTION_READERS_INT = {{#function:exceptions?}}new HashMap<>();{{/function:exceptions?}}{{^function:exceptions?}}java.util.Collections.emptyMap();{{/function:exceptions?}}
  {{#function:exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_EXCEPTION_READER_INT{{field:index}} =
  oprot -> {
            try {
              {{#field:type}}
              {{> ReadReturnType }}

              return _r;
              {{/field:type}}
            } catch (Throwable _e) {
              throw reactor.core.Exceptions.propagate(_e);
            }
          };

  {{/function:exceptions}}
  {{/service:supportedFunctions}}
  {{#service:streamingFunctions}}
  {{#function:args}}
  private static final TField _{{function:javaName}}_{{field:javaTFieldName}}_INT = new TField("{{field:name}}", TType.{{#field:type}}{{> TType}}{{/field:type}}, (short){{field:key}});
  {{/function:args}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_EXCEPTION_READERS_INT = {{#function:exceptions?}}new HashMap<>();{{/function:exceptions?}}{{^function:exceptions?}}java.util.Collections.emptyMap();{{/function:exceptions?}}
  private static final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_STREAM_EXCEPTION_READERS_INT = {{#function:stream_exceptions?}}new HashMap<>();{{/function:stream_exceptions?}}{{^function:stream_exceptions?}}java.util.Collections.emptyMap();{{/function:stream_exceptions?}}
  {{#function:exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_EXCEPTION_READER_INT{{field:index}} =
  oprot -> {
            try {
              {{#field:type}}
              {{> ReadReturnType }}

              return _r;
              {{/field:type}}
            } catch (Throwable _e) {
              throw reactor.core.Exceptions.propagate(_e);
            }
          };

  {{/function:exceptions}}
  {{#function:stream_exceptions}}
  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_STREAM_EXCEPTION_READER_INT{{field:index}} =
  oprot -> {
            try {
              {{#field:type}}
              {{> ReadReturnType }}

              return _r;
              {{/field:type}}
            } catch (Throwable _e) {
              throw reactor.core.Exceptions.propagate(_e);
            }
          };

  {{/function:stream_exceptions}}
  {{/service:streamingFunctions}}
  {{/service:interactions}}

  static {
    {{#service:supportedFunctions}}{{#function:exceptions}}
    _{{function:javaName}}_EXCEPTION_READERS.put((short){{field:key}}, _{{function:javaName}}_EXCEPTION_READER{{field:index}});
    {{/function:exceptions}}{{/service:supportedFunctions}}
    {{#service:streamingFunctions}}{{#function:exceptions}}
    _{{function:javaName}}_EXCEPTION_READERS.put((short){{field:key}}, _{{function:javaName}}_EXCEPTION_READER{{field:index}});
    {{/function:exceptions}}{{#function:stream_exceptions}}
    _{{function:javaName}}_STREAM_EXCEPTION_READERS.put((short){{field:key}}, _{{function:javaName}}_STREAM_EXCEPTION_READER{{field:index}});
    {{/function:stream_exceptions}}{{/service:streamingFunctions}}
    {{#service:sinkFunctions}}{{#function:exceptions}}
    _{{function:javaName}}_EXCEPTION_READERS.put((short){{field:key}}, _{{function:javaName}}_EXCEPTION_READER{{field:index}});
    {{/function:exceptions}}{{#function:sink_final_response_exceptions}}
    _{{function:javaName}}_STREAM_EXCEPTION_READERS.put((short){{field:key}}, _{{function:javaName}}_STREAM_EXCEPTION_READER{{field:index}});
    {{/function:sink_final_response_exceptions}}{{/service:sinkFunctions}}
  }

  public {{service:javaCapitalName}}ReactiveClient(org.apache.thrift.ProtocolId _protocolId, reactor.core.publisher.Mono<? extends com.facebook.thrift.client.RpcClient> _rpcClient) {
    {{#service:extends}}super(_protocolId, _rpcClient);{{/service:extends}}
    this._protocolId = _protocolId;
    this._rpcClient = _rpcClient;
    this._headers = java.util.Collections.emptyMap();
    this._persistentHeaders = java.util.Collections.emptyMap();
    this._activeInteractions = ConcurrentHashMap.newKeySet();
  }

  public {{service:javaCapitalName}}ReactiveClient(org.apache.thrift.ProtocolId _protocolId, reactor.core.publisher.Mono<? extends com.facebook.thrift.client.RpcClient> _rpcClient, Map<String, String> _headers, Map<String, String> _persistentHeaders) {
    this(_protocolId, _rpcClient, _headers, _persistentHeaders, new AtomicLong(), ConcurrentHashMap.newKeySet());
  }

  public {{service:javaCapitalName}}ReactiveClient(org.apache.thrift.ProtocolId _protocolId, reactor.core.publisher.Mono<? extends com.facebook.thrift.client.RpcClient> _rpcClient, Map<String, String> _headers, Map<String, String> _persistentHeaders, AtomicLong interactionCounter, Set<Long> activeInteractions) {
    {{#service:extends}}super(_protocolId, _rpcClient);{{/service:extends}}
    this._protocolId = _protocolId;
    this._rpcClient = _rpcClient;
    this._headers = _headers;
    this._persistentHeaders = _persistentHeaders;
    this._activeInteractions = activeInteractions;
  }

  @java.lang.Override
  public void dispose() {}

  {{#service:supportedFunctions}}
  private com.facebook.thrift.payload.Writer _create{{function:javaName}}Writer({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
    return oprot -> {
      try {
        {{#function:args}}
        {{#field:type}}
        {
          oprot.writeFieldBegin(_{{function:javaName}}_{{field:javaTFieldName}});

          {{> FieldType}} _iter0 = {{field:javaName}};

          {{#field:type}}
          {{field:nestedDepth++}}{{> WriteRequestType}}{{field:nestedDepth--}}
          {{/field:type}}
          oprot.writeFieldEnd();
        }
        {{/field:type}}

        {{/function:args}}

      } catch (Throwable _e) {
        throw reactor.core.Exceptions.propagate(_e);
      }
    };
  }

  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_READER =
    oprot -> {
              try {
                {{^function:voidType}}
                {{#function:return_type}}
                {{> ReadReturnType }}

                return _r;
                {{/function:return_type}}
                {{/function:voidType}}

                {{#function:voidType}}
                return null;
                {{/function:voidType}}

              } catch (Throwable _e) {
                throw reactor.core.Exceptions.propagate(_e);
              }
            };


  @java.lang.Override
  public reactor.core.publisher.Mono<com.facebook.thrift.client.ResponseWrapper<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}>> {{function:javaName}}Wrapper({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} final com.facebook.thrift.client.RpcOptions rpcOptions) {
    return _rpcClient
      .flatMap(_rpc -> {
        org.apache.thrift.RequestRpcMetadata _metadata = new org.apache.thrift.RequestRpcMetadata.Builder()
                .setName("{{function:javaName}}")
                .setKind(org.apache.thrift.RpcKind.{{#function:oneway?}}SINGLE_REQUEST_NO_RESPONSE{{/function:oneway?}}{{^function:oneway?}}SINGLE_REQUEST_SINGLE_RESPONSE{{/function:oneway?}})
                .setOtherMetadata(getHeaders(rpcOptions))
                .setProtocol(_protocolId)
                .build();

            com.facebook.thrift.payload.ClientRequestPayload<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}> _crp =
                com.facebook.thrift.payload.ClientRequestPayload.create(
                    _create{{function:javaName}}Writer({{#function:args}}{{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}),
                    _{{function:javaName}}_READER,
                    _{{function:javaName}}_EXCEPTION_READERS,
                    _metadata,
                    java.util.Collections.emptyMap());

            return _rpc
                {{#function:oneway?}}.singleRequestNoResponse(_crp, rpcOptions).thenReturn(ResponseWrapper.create(null, java.util.Collections.emptyMap(), java.util.Collections.emptyMap())){{/function:oneway?}}{{!
                }}{{^function:oneway?}}.singleRequestSingleResponse(_crp, rpcOptions).doOnNext(_p -> {if(_p.getException() != null) throw reactor.core.Exceptions.propagate(_p.getException());}){{/function:oneway?}};
      });
  }

  @java.lang.Override
  public reactor.core.publisher.Mono<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} final com.facebook.thrift.client.RpcOptions rpcOptions) {
    return {{function:javaName}}Wrapper({{#function:args}}{{field:javaName}}, {{/function:args}} rpcOptions){{#function:oneway?}}.then(){{/function:oneway?}}{{!
        }}{{^function:oneway?}}{{^function:voidType}}.map(_p -> _p.getData()){{/function:voidType}}{{/function:oneway?}}{{!
        }}{{^function:oneway?}}{{#function:voidType}}.then(){{/function:voidType}}{{/function:oneway?}};
  }

  @java.lang.Override
  public reactor.core.publisher.Mono<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
    return {{function:javaName}}({{#function:args}}{{field:javaName}}, {{/function:args}} com.facebook.thrift.client.RpcOptions.EMPTY);
  }

  {{/service:supportedFunctions}}
  {{#service:streamingFunctions}}
  private com.facebook.thrift.payload.Writer _create{{function:javaName}}Writer({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
    return oprot -> {
      try {
        {{#function:args}}
        {{#field:type}}
        {
          oprot.writeFieldBegin(_{{function:javaName}}_{{field:javaTFieldName}});

          {{> FieldType}} _iter0 = {{field:javaName}};

          {{#field:type}}
          {{field:nestedDepth++}}{{> WriteRequestType}}{{field:nestedDepth--}}
          {{/field:type}}
          oprot.writeFieldEnd();
        }
        {{/field:type}}

        {{/function:args}}

      } catch (Throwable _e) {
        throw reactor.core.Exceptions.propagate(_e);
      }
    };
  }

  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_READER =
    oprot -> {
              try {
                {{^function:voidType}}
                {{#function:return_type}}{{#type:stream_elem_type}}
                {{> ReadReturnType }}

                return _r;
                {{/type:stream_elem_type}}{{/function:return_type}}
                {{/function:voidType}}

                {{#function:voidType}}
                return null;
                {{/function:voidType}}

              } catch (Throwable _e) {
                throw reactor.core.Exceptions.propagate(_e);
              }
            };
    {{#function:return_type}}{{#type:stream_has_first_response?}}

    private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_FIRST_READER =
    oprot -> {
              try {
                {{^function:voidType}}
                {{#function:return_type}}{{#type:stream_first_response_type}}
                {{> ReadReturnType }}

                return _r;
                {{/type:stream_first_response_type}}{{/function:return_type}}
                {{/function:voidType}}

                {{#function:voidType}}
                return null;
                {{/function:voidType}}

              } catch (Throwable _e) {
                throw reactor.core.Exceptions.propagate(_e);
              }
            };
  {{/type:stream_has_first_response?}}{{/function:return_type}}

  @java.lang.Override
  public reactor.core.publisher.Flux<com.facebook.thrift.client.ResponseWrapper<{{#function:return_type}}{{> StreamReturnType}}{{/function:return_type}}>> {{function:javaName}}Wrapper({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} final com.facebook.thrift.client.RpcOptions rpcOptions) {
    return _rpcClient
      .flatMapMany(_rpc -> {
        org.apache.thrift.RequestRpcMetadata _metadata = new org.apache.thrift.RequestRpcMetadata.Builder()
                .setName("{{function:javaName}}")
                .setKind(org.apache.thrift.RpcKind.SINGLE_REQUEST_STREAMING_RESPONSE)
                .setOtherMetadata(getHeaders(rpcOptions))
                .setProtocol(_protocolId)
                .build();

            com.facebook.thrift.payload.ClientRequestPayload<{{#function:return_type}}{{#type:stream_elem_type}}{{> BoxedType}}{{/type:stream_elem_type}}{{/function:return_type}}> _crp =
                com.facebook.thrift.payload.ClientRequestPayload.create(
                    _create{{function:javaName}}Writer({{#function:args}}{{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}),
                    _{{function:javaName}}_READER,{{#function:return_type}}{{#type:stream_has_first_response?}}
                    _{{function:javaName}}_FIRST_READER,{{/type:stream_has_first_response?}}{{/function:return_type}}
                    _{{function:javaName}}_EXCEPTION_READERS,
                    _{{function:javaName}}_STREAM_EXCEPTION_READERS,
                    _metadata,
                    java.util.Collections.emptyMap());

            return _rpc
                .singleRequestStreamingResponse(_crp, rpcOptions)
                .doOnNext(_p -> {if(_p.getException() != null) throw reactor.core.Exceptions.propagate(_p.getException());}){{#function:return_type}}{{^type:stream_has_first_response?}}
                .filter((_p) -> ((com.facebook.thrift.model.StreamResponse)_p.getData()).isSetData()){{/type:stream_has_first_response?}}{{/function:return_type}}
                .map(_p -> ResponseWrapper.create((({{#function:return_type}}{{> StreamResponseGeneric}}{{/function:return_type}})_p.getData()){{#function:return_type}}{{^type:stream_has_first_response?}}.getData(){{/type:stream_has_first_response?}}{{/function:return_type}}, _p.getHeaders(), _p.getBinaryHeaders()));
      });
  }

  @java.lang.Override
  public reactor.core.publisher.Flux<{{#function:return_type}}{{> StreamReturnType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} final com.facebook.thrift.client.RpcOptions rpcOptions) {
      return {{function:javaName}}Wrapper({{#function:args}}{{field:javaName}}, {{/function:args}} rpcOptions).map(_p -> _p.getData());
  }

  @java.lang.Override
  public reactor.core.publisher.Flux<{{#function:return_type}}{{> StreamReturnType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
    return {{function:javaName}}({{#function:args}}{{field:javaName}}, {{/function:args}} com.facebook.thrift.client.RpcOptions.EMPTY);
  }

  {{/service:streamingFunctions}}

  {{#service:sinkFunctions}}
  private com.facebook.thrift.payload.Writer _create{{function:javaName}}Writer({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
    return oprot -> {
      try {
        {{#function:args}}
        {{#field:type}}
        {
          oprot.writeFieldBegin(_{{function:javaName}}_{{field:javaTFieldName}});

          {{> FieldType}} _iter0 = {{field:javaName}};

          {{#field:type}}
          {{field:nestedDepth++}}{{> WriteRequestType}}{{field:nestedDepth--}}
          {{/field:type}}
          oprot.writeFieldEnd();
        }
        {{/field:type}}

        {{/function:args}}

      } catch (Throwable _e) {
        throw reactor.core.Exceptions.propagate(_e);
      }
    };
  }

  private com.facebook.thrift.payload.Writer _create{{function:javaName}}SinkWriter({{#function:return_type}}{{#type:sink_elem_type}}{{ >BoxedType}} _p{{/type:sink_elem_type}}{{/function:return_type}}) {
    return oprot -> {
      try {
        oprot.writeFieldBegin(_{{function:javaName}}_SINK_TFIELD);
        {{#function:return_type}}{{#type:sink_elem_type}}
        {{> WriteSinkPayloadType}}
        {{/type:sink_elem_type}}{{/function:return_type}}

        oprot.writeFieldEnd();

      } catch (Throwable _e) {
        throw reactor.core.Exceptions.propagate(_e);
      }
    };
  }

  private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_READER =
    oprot -> {
              try {
                {{^function:voidType}}
                {{#function:return_type}}{{#type:sink_final_response_type}}
                {{> ReadReturnType }}

                return _r;
                {{/type:sink_final_response_type}}{{/function:return_type}}
                {{/function:voidType}}

                {{#function:voidType}}
                return null;
                {{/function:voidType}}

              } catch (Throwable _e) {
                throw reactor.core.Exceptions.propagate(_e);
              }
            };
    {{#function:return_type}}{{#type:sink_has_first_response?}}

    private static final com.facebook.thrift.payload.Reader _{{function:javaName}}_FIRST_READER =
    oprot -> {
              try {
                {{^function:voidType}}
                {{#function:return_type}}{{#type:sink_first_response_type}}
                {{> ReadReturnType }}

                return _r;
                {{/type:sink_first_response_type}}{{/function:return_type}}
                {{/function:voidType}}

                {{#function:voidType}}
                return null;
                {{/function:voidType}}

              } catch (Throwable _e) {
                throw reactor.core.Exceptions.propagate(_e);
              }
            };
  {{/type:sink_has_first_response?}}{{/function:return_type}}

  @java.lang.Override
  public {{#function:return_type}}{{> SinkWrapperReturnType}}{{/function:return_type}} {{function:javaName}}Wrapper({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} org.reactivestreams.Publisher<{{#function:return_type}}{{#type:sink_elem_type}}{{ >BoxedType}}{{/type:sink_elem_type}}{{/function:return_type}}> payloads, com.facebook.thrift.client.RpcOptions rpcOptions) {
    return _rpcClient
      .flatMapMany(_rpc -> {
        org.apache.thrift.RequestRpcMetadata _metadata = new org.apache.thrift.RequestRpcMetadata.Builder()
                .setName("{{function:javaName}}")
                .setKind(org.apache.thrift.RpcKind.SINK)
                .setOtherMetadata(getHeaders(rpcOptions))
                .setProtocol(_protocolId)
                .build();

            com.facebook.thrift.payload.ClientRequestPayload<{{#function:return_type}}{{#type:sink_final_response_type}}{{> BoxedType}}{{/type:sink_final_response_type}}{{/function:return_type}}> _crp =
                com.facebook.thrift.payload.ClientRequestPayload.create(
                    _create{{function:javaName}}Writer({{#function:args}}{{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}),
                    _{{function:javaName}}_READER,{{#function:return_type}}{{#type:sink_has_first_response?}}
                    _{{function:javaName}}_FIRST_READER,{{/type:sink_has_first_response?}}{{/function:return_type}}
                    _{{function:javaName}}_EXCEPTION_READERS,
                    _{{function:javaName}}_STREAM_EXCEPTION_READERS,
                    _metadata,
                    java.util.Collections.emptyMap());

            reactor.core.publisher.Flux<com.facebook.thrift.payload.ClientRequestPayload<{{#function:return_type}}{{#type:sink_final_response_type}}{{> BoxedType}}{{/type:sink_final_response_type}}{{/function:return_type}}>> _sink =
              reactor.core.publisher.Mono.just(_crp).concatWith(reactor.core.publisher.Flux.from(payloads)
                .map(_p -> com.facebook.thrift.payload.ClientRequestPayload.create(
                    _create{{function:javaName}}SinkWriter(_p),
                    _{{function:javaName}}_READER,{{#function:return_type}}{{#type:sink_has_first_response?}}
                    _{{function:javaName}}_FIRST_READER,{{/type:sink_has_first_response?}}{{/function:return_type}}
                    _{{function:javaName}}_EXCEPTION_READERS,
                    _{{function:javaName}}_STREAM_EXCEPTION_READERS,
                    _metadata,
                    java.util.Collections.emptyMap())));

            return _rpc
                .streamingRequestStreamingResponse(_sink, rpcOptions)
                .doOnNext(_p -> {if(_p.getException() != null) throw reactor.core.Exceptions.propagate(_p.getException());})
                .limitRequest(2){{#function:return_type}}{{^type:sink_has_first_response?}}
                .filter((_p) -> ((com.facebook.thrift.model.StreamResponse)_p.getData()).isSetData()){{/type:sink_has_first_response?}}{{/function:return_type}}
                .map(_p -> ResponseWrapper.create((({{#function:return_type}}{{> SinkResponseGeneric}}{{/function:return_type}})_p.getData()){{#function:return_type}}{{^type:sink_has_first_response?}}.getData(){{/type:sink_has_first_response?}}{{/function:return_type}}, _p.getHeaders(), _p.getBinaryHeaders()));
      }){{#function:return_type}}{{^type:sink_has_first_response?}}.single(){{/type:sink_has_first_response?}}{{/function:return_type}};
  }

  @java.lang.Override
  public {{#function:return_type}}{{> SinkReturnType}}{{/function:return_type}} {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} org.reactivestreams.Publisher<{{#function:return_type}}{{#type:sink_elem_type}}{{ >BoxedType}}{{/type:sink_elem_type}}{{/function:return_type}}> payloads, com.facebook.thrift.client.RpcOptions rpcOptions) {
      return {{function:javaName}}Wrapper({{#function:args}}{{field:javaName}}, {{/function:args}} payloads, rpcOptions).map(_p -> _p.getData());
  }

  @java.lang.Override
  public {{#function:return_type}}{{> SinkReturnType}}{{/function:return_type}} {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} org.reactivestreams.Publisher<{{#function:return_type}}{{#type:sink_elem_type}}{{ >BoxedType}}{{/type:sink_elem_type}}{{/function:return_type}}> payloads) {
      return {{function:javaName}}({{#function:args}}{{field:javaName}}, {{/function:args}} payloads, com.facebook.thrift.client.RpcOptions.EMPTY);
  }

  {{/service:sinkFunctions}}
  {{#service:interactions}}
  public class {{service:name}}Impl implements {{service:name}} {
    private final long interactionId;

    {{service:name}}Impl(long interactionId) {
      this.interactionId = interactionId;
    }

    {{#service:supportedFunctions}}
    private final java.util.Map<Short, com.facebook.thrift.payload.Reader> _{{function:javaName}}_EXCEPTION_READERS = {{#function:exceptions?}}new HashMap<>();{{/function:exceptions?}}{{^function:exceptions?}}java.util.Collections.emptyMap();{{/function:exceptions?}}
    {{#function:exceptions}}
    private final com.facebook.thrift.payload.Reader _{{function:javaName}}_EXCEPTION_READER_INT{{field:index}} =
    oprot -> {
              try {
                {{#field:type}}
                {{> ReadReturnType }}

                return _r;
                {{/field:type}}
              } catch (Throwable _e) {
                throw reactor.core.Exceptions.propagate(_e);
              }
            };

    {{/function:exceptions}}
    {{/service:supportedFunctions}}

    {{#service:supportedFunctions}}
    private com.facebook.thrift.payload.Writer _create{{function:javaName}}Writer({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
      return oprot -> {
        try {
          {{#function:args}}
          {{#field:type}}
          {
            oprot.writeFieldBegin(_{{function:javaName}}_{{field:javaTFieldName}}_INT);

            {{> FieldType}} _iter0 = {{field:javaName}};

            {{#field:type}}
            {{field:nestedDepth++}}{{> WriteRequestType}}{{field:nestedDepth--}}
            {{/field:type}}
            oprot.writeFieldEnd();
          }
          {{/field:type}}

          {{/function:args}}

        } catch (Throwable _e) {
          throw reactor.core.Exceptions.propagate(_e);
        }
      };
    }

    private final com.facebook.thrift.payload.Reader _{{function:javaName}}_READER =
      oprot -> {
                try {
                  {{^function:voidType}}
                  {{#function:return_type}}
                  {{> ReadReturnType }}

                  return _r;
                  {{/function:return_type}}
                  {{/function:voidType}}

                  {{#function:voidType}}
                  return null;
                  {{/function:voidType}}

                } catch (Throwable _e) {
                  throw reactor.core.Exceptions.propagate(_e);
                }
              };

    public reactor.core.publisher.Mono<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}
    final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}},{{/last?}}{{/function:args}}) {
      return {{function:javaName}}Wrapper({{#function:args}}{{field:javaName}}, {{/function:args}} com.facebook.thrift.client.RpcOptions.EMPTY){{#function:oneway?}}.then(){{/function:oneway?}}{{!
        }}{{^function:oneway?}}{{^function:voidType}}.map(_p -> _p.getData()){{/function:voidType}}{{/function:oneway?}}{{!
        }}{{^function:oneway?}}{{#function:voidType}}.then(){{/function:voidType}}{{/function:oneway?}};
    }

    @java.lang.Override
    public reactor.core.publisher.Mono<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}}RpcOptions rpcOptions)  {
      return {{function:javaName}}Wrapper({{#function:args}}{{field:javaName}}, {{/function:args}} rpcOptions){{#function:oneway?}}.then(){{/function:oneway?}}{{!
        }}{{^function:oneway?}}{{^function:voidType}}.map(_p -> _p.getData()){{/function:voidType}}{{/function:oneway?}}{{!
        }}{{^function:oneway?}}{{#function:voidType}}.then(){{/function:voidType}}{{/function:oneway?}};
    }

    @java.lang.Override
    public reactor.core.publisher.Mono<ResponseWrapper<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}>> {{function:javaName}}Wrapper({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}}RpcOptions rpcOptions)  {
      return _rpcClient
        .subscriberContext(ctx -> ctx.put(STICKY_HASH_KEY, interactionId))
        .flatMap(_rpc -> {
          String interactionName = "{{service:name}}.{{function:javaName}}";
          org.apache.thrift.RequestRpcMetadata.Builder _metadataBuilder = new org.apache.thrift.RequestRpcMetadata.Builder()
                  .setName(interactionName)
                  .setKind(org.apache.thrift.RpcKind.{{#function:oneway?}}SINGLE_REQUEST_NO_RESPONSE{{/function:oneway?}}{{^function:oneway?}}SINGLE_REQUEST_SINGLE_RESPONSE{{/function:oneway?}})
                  .setOtherMetadata(getHeaders(rpcOptions))
                  .setProtocol(_protocolId);

          if (_activeInteractions.contains(interactionId)) {
            _metadataBuilder.setInteractionId(interactionId);
          } else {
            _metadataBuilder.setInteractionCreate(
              new InteractionCreate.Builder()
                  .setInteractionId(interactionId)
                  .setInteractionName("{{service:name}}")
                  .build());
            _metadataBuilder.setInteractionId(0L);
            _activeInteractions.add(interactionId);
          }

          org.apache.thrift.RequestRpcMetadata _metadata = _metadataBuilder.build();

          com.facebook.thrift.payload.ClientRequestPayload<{{#function:return_type}}{{> BoxedType}}{{/function:return_type}}> _crp =
              com.facebook.thrift.payload.ClientRequestPayload.create(
                  _create{{function:javaName}}Writer({{#function:args}}{{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}),
                  _{{function:javaName}}_READER,
                  _{{function:javaName}}_EXCEPTION_READERS_INT,
                  _metadata,
                  java.util.Collections.emptyMap());

          return _rpc
              {{#function:oneway?}}.singleRequestNoResponse(_crp, rpcOptions).thenReturn(ResponseWrapper.create(null, java.util.Collections.emptyMap(), java.util.Collections.emptyMap())){{/function:oneway?}}{{!
              }}{{^function:oneway?}}.singleRequestSingleResponse(_crp, rpcOptions).doOnNext(_p -> {if(_p.getException() != null) throw reactor.core.Exceptions.propagate(_p.getException());}){{/function:oneway?}};
        });
    }

    {{/service:supportedFunctions}}
    {{#service:streamingFunctions}}
    private com.facebook.thrift.payload.Writer _create{{function:javaName}}Writer({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
      return oprot -> {
        try {
          {{#function:args}}
          {{#field:type}}
          {
            oprot.writeFieldBegin(_{{function:javaName}}_{{field:javaTFieldName}}_INT);

            {{> FieldType}} _iter0 = {{field:javaName}};

            {{#field:type}}
            {{field:nestedDepth++}}{{> WriteRequestType}}{{field:nestedDepth--}}
            {{/field:type}}
            oprot.writeFieldEnd();
          }
          {{/field:type}}

          {{/function:args}}

        } catch (Throwable _e) {
          throw reactor.core.Exceptions.propagate(_e);
        }
      };
    }

    private final com.facebook.thrift.payload.Reader _{{function:javaName}}_READER =
      oprot -> {
                try {
                  {{^function:voidType}}
                  {{#function:return_type}}{{#type:stream_elem_type}}
                  {{> ReadReturnType }}

                  return _r;
                  {{/type:stream_elem_type}}{{/function:return_type}}
                  {{/function:voidType}}

                  {{#function:voidType}}
                  return null;
                  {{/function:voidType}}

                } catch (Throwable _e) {
                  throw reactor.core.Exceptions.propagate(_e);
                }
              };
      {{#function:return_type}}{{#type:stream_has_first_response?}}

      private final com.facebook.thrift.payload.Reader _{{function:javaName}}_FIRST_READER =
      oprot -> {
                try {
                  {{^function:voidType}}
                  {{#function:return_type}}{{#type:stream_first_response_type}}
                  {{> ReadReturnType }}

                  return _r;
                  {{/type:stream_first_response_type}}{{/function:return_type}}
                  {{/function:voidType}}

                  {{#function:voidType}}
                  return null;
                  {{/function:voidType}}

                } catch (Throwable _e) {
                  throw reactor.core.Exceptions.propagate(_e);
                }
              };
    {{/type:stream_has_first_response?}}{{/function:return_type}}

    @java.lang.Override
    public reactor.core.publisher.Flux<com.facebook.thrift.client.ResponseWrapper<{{#function:return_type}}{{> StreamReturnType}}{{/function:return_type}}>> {{function:javaName}}Wrapper({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} final com.facebook.thrift.client.RpcOptions rpcOptions) {
      return _rpcClient
        .subscriberContext(ctx -> ctx.put(STICKY_HASH_KEY, interactionId))
        .flatMapMany(_rpc -> {
          String interactionName = "{{service:name}}.{{function:javaName}}";
          org.apache.thrift.RequestRpcMetadata.Builder _metadataBuilder = new org.apache.thrift.RequestRpcMetadata.Builder()
                  .setName(interactionName)
                  .setKind(org.apache.thrift.RpcKind.SINGLE_REQUEST_STREAMING_RESPONSE)
                  .setOtherMetadata(getHeaders(rpcOptions))
                  .setProtocol(_protocolId);

          if (_activeInteractions.contains(interactionId)) {
            _metadataBuilder.setInteractionId(interactionId);
          } else {
            _metadataBuilder.setInteractionCreate(
              new InteractionCreate.Builder()
                  .setInteractionId(interactionId)
                  .setInteractionName("{{service:name}}")
                  .build());
            _metadataBuilder.setInteractionId(0L);
            _activeInteractions.add(interactionId);
          }


          com.facebook.thrift.payload.ClientRequestPayload<{{#function:return_type}}{{#type:stream_elem_type}}{{> BoxedType}}{{/type:stream_elem_type}}{{/function:return_type}}> _crp =
              com.facebook.thrift.payload.ClientRequestPayload.create(
                  _create{{function:javaName}}Writer({{#function:args}}{{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}),
                  _{{function:javaName}}_READER,{{#function:return_type}}{{#type:stream_has_first_response?}}
                  _{{function:javaName}}_FIRST_READER,{{/type:stream_has_first_response?}}{{/function:return_type}}
                  _{{function:javaName}}_EXCEPTION_READERS_INT,
                  _{{function:javaName}}_STREAM_EXCEPTION_READERS_INT,
                  _metadataBuilder.build(),
                  java.util.Collections.emptyMap());

          return _rpc
              .singleRequestStreamingResponse(_crp, rpcOptions)
              .doOnNext(_p -> {if(_p.getException() != null) throw reactor.core.Exceptions.propagate(_p.getException());}){{#function:return_type}}{{^type:stream_has_first_response?}}
              .filter((_p) -> ((com.facebook.thrift.model.StreamResponse)_p.getData()).isSetData()){{/type:stream_has_first_response?}}{{/function:return_type}}
              .map(_p -> ResponseWrapper.create((({{#function:return_type}}{{> StreamResponseGeneric}}{{/function:return_type}})_p.getData()){{#function:return_type}}{{^type:stream_has_first_response?}}.getData(){{/type:stream_has_first_response?}}{{/function:return_type}}, _p.getHeaders(), _p.getBinaryHeaders()));
        });
    }

    @java.lang.Override
    public reactor.core.publisher.Flux<{{#function:return_type}}{{> StreamReturnType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}, {{/function:args}} final com.facebook.thrift.client.RpcOptions rpcOptions) {
        return {{function:javaName}}Wrapper({{#function:args}}{{field:javaName}}, {{/function:args}} rpcOptions).map(_p -> _p.getData());
    }

    @java.lang.Override
    public reactor.core.publisher.Flux<{{#function:return_type}}{{> StreamReturnType}}{{/function:return_type}}> {{function:javaName}}({{#function:args}}final {{#field:type}}{{> Type}}{{/field:type}} {{field:javaName}}{{^last?}}, {{/last?}}{{/function:args}}) {
      return {{function:javaName}}({{#function:args}}{{field:javaName}}, {{/function:args}} com.facebook.thrift.client.RpcOptions.EMPTY);
    }

    {{/service:streamingFunctions}}
    @java.lang.Override
    public void dispose() {
      _activeInteractions.remove(interactionId);
      _rpcClient
        .subscriberContext(ctx -> ctx.put(STICKY_HASH_KEY, interactionId))
        .flatMap(_rpc -> {
          InteractionTerminate term = new InteractionTerminate.Builder().setInteractionId(interactionId).build();
          ClientPushMetadata metadata = ClientPushMetadata.fromInteractionTerminate(term);
          return _rpc.metadataPush(metadata, com.facebook.thrift.client.RpcOptions.EMPTY);
        }).subscribe();
    }
  }

  public {{service:name}} create{{service:name}}() {
      return new {{service:name}}Impl(_interactionCounter.incrementAndGet());
  }
  {{^last?}}

  {{/last?}}
  {{/service:interactions}}

  private Map<String, String> getHeaders(com.facebook.thrift.client.RpcOptions rpcOptions) {
      Map<String, String> headers = new HashMap<>();
      if (rpcOptions.getRequestHeaders() != null && !rpcOptions.getRequestHeaders().isEmpty()) {
          headers.putAll(rpcOptions.getRequestHeaders());
      }
      if (_headers != null && !_headers.isEmpty()) {
          headers.putAll(_headers);
      }
      if (_persistentHeaders != null && !_persistentHeaders.isEmpty()) {
          headers.putAll(_persistentHeaders);
      }
      return headers;
  }
}
{{/service:disableReactive?}}
